[1]:
import pylab as pp
import numpy as np
import pandas as pd
import scipy.signal as scs
from scipy import integrate, interpolate, optimize
from scipy.integrate import odeint
from pylab import *
import netCDF4
from bokeh.models import ColumnDataSource, RangeTool, LinearAxis, Range1d
from bokeh.palettes import brewer, Inferno10
from bokeh.plotting import figure, show
from bokeh.layouts import column
from bokeh.io import output_notebook
output_notebook()
Dados Reais¶
Dados referentes a cidade de Londres entre 1944 e 1964. População de aproximadamente 2.5 milhões de habitantes na época.
[2]:
# Lendo o arquivo de dados no formato 'filename.csv'
data = pd.read_csv("./PG_IMT/DadosEpidemia/UKCities/London.csv")
# Preview das cinco primeiras linhas
data.head()
s_array = data[["cases", "births", "pop","time"]].to_numpy()
Id = s_array[:,0]
Bd = s_array[:,1]
Sd = s_array[:,2]
t = s_array[:,3]
Visualizando os dados¶
[3]:
TOOLS="zoom_in,zoom_out,save"
p1 = figure(tools=TOOLS, plot_width=600, plot_height=300)
p2 = figure(tools=TOOLS, plot_width=600, plot_height=300)
p3 = figure(tools=TOOLS, plot_width=600, plot_height=300)
p1.line(data["time"], data["births"], line_width=4, color="#8e44ad", line_alpha=0.9)
p1.grid.grid_line_alpha = 0
p1.ygrid.band_fill_color = "olive"
p1.ygrid.band_fill_alpha = 0.1
p1.yaxis.axis_label = "Nascimentos"
p1.xaxis.axis_label = "Data"
p2.line(data["time"], data["pop"], line_width=4, color="#8e44ad", line_alpha=0.9)
p2.grid.grid_line_alpha = 0
p2.ygrid.band_fill_color = "olive"
p2.ygrid.band_fill_alpha = 0.1
p2.yaxis.axis_label = "População"
p2.xaxis.axis_label = "Data"
p3.line(data["time"], data["cases"], line_width=4, color="#8e44ad", line_alpha=0.9)
p3.grid.grid_line_alpha = 0
p3.ygrid.band_fill_color = "olive"
p3.ygrid.band_fill_alpha = 0.1
p3.yaxis.axis_label = "Casos"
p3.xaxis.axis_label = "Data"
show(column(p1,p2,p3))
Selecionando um ano específico para análise¶
[4]:
selected_year = 1948
is_1948 = (data['time'].astype(int) == selected_year)
LD48 = data[is_1948]
LD48.head()
p = figure(tools=TOOLS, plot_width=600, plot_height=300)
p.line(LD48["time"], LD48["cases"], line_width=4, color="#8e44ad", line_alpha=0.9)
p.grid.grid_line_alpha = 0
p.ygrid.band_fill_color = "olive"
p.ygrid.band_fill_alpha = 0.1
p.yaxis.axis_label = "Casos"
p.xaxis.axis_label = "Data"
show(p)
Os dados¶
Conhecemos o número de casos, os nascimentos e a população total. Podemos determinar os demais valores a partir da relação \(S(t)+I(t)+R(t)=N\) onde N vamos considerar a população.
[5]:
s_array = LD48[["cases", "births", "pop","time"]].to_numpy()
Id = s_array[:,0]
Bd = s_array[:,1]
Sd = s_array[:,2] - Id + Bd
t = s_array[:,3]
O problema¶
O conjunto de equações diferenciais que caracteriza o modelo é descrito abaixo. No modelo \(\beta\) - representa a taxa de transmissão ou taxa efetiva de contato e \(r\) - a taxa de remoção ou recuperação.
Gostaríamos de identificar quais parâmetros \(\beta\) e \(r\) resultam num melhor ajuste do modelo para os dados de S, I e R
[6]:
def SIRmodel(y, t, Beta, r):
S, I = y
Sdot = -Beta * S * I
Idot = Beta * S * I - r * I
return Sdot, Idot
# Resolução da simulação - Escala temporal (dias)
Obtendo \(y_s(\theta,k) = [S \; I \: R]\)¶
O trecho a seguir retorna os valores sintetizados \(y_s(\theta,k) = [S \; I \: R]\) representa o dado sintetizado a partir de um modelo sintetizado para uma determinada amostra \(k\) e \(\theta\) representa o vetor ed parâmetros \(\theta = [ \beta \; \; r]^T\). A partir de uma condição inicial \(y_0\).
[7]:
def SIRsim(y0, t, theta):
Beta, r = theta[0], theta[1]
ret = integrate.odeint(SIRmodel, y0, t, args=(Beta,r))
S, I = ret.T
return S, I
Condições inicias - \(y_0\) e \(\theta_0\)¶
[8]:
# Valores iniciais
I0 = Id[0]
S0 = Sd[0]
# Vetor de condições iniciais
y0 = S0, I0
# Beta - taxa de contato
# r - taxa média de recuperação (in 1/dia).
# theta = [Beta, r]
theta0 = [1e-8, 1e-1] # valores iniciais
# Definição do conjunto de equações diferencias não lineares que formam o modelo.
t = (t - 1948) * 365
Estimação de parâmetros¶
Para estimarmos os parâmetros do modelo \(\mathbf{\beta}\) e \(\mathbf{r}\), vamos utilizar inicialmente o método de mínimos quadrados. Podemos então formular o problema a partir da Equação abaixo. Na Equação \(y_m(k)\) representa o dado real em cada amostra \(k\); \(y_s(\theta,k)\) representa o valor estimado a partir da simulação do modelo para uma determinada amostra \(k\) e \(\theta\) representa o vetor ed parâmetros \(\theta = [ \beta \; \; r]^T\).
A equação formula a pergunta: quais os valores de \(beta\) e \(r\) que minizam o erro quadrático quando comparados com os dados reais.
[9]:
def ErroQuadratico(Sd,Id,y0,t,theta0,w):
""" function to pass to optimize.leastsq
The routine will square and sum the values returned by
this function"""
[S,I] = SIRsim(y0,t,theta0)
erroS = w[0] * (S - Sd)
erroI = w[1] * (I - Id)
EQ = np.concatenate([erroS,erroI])
if sum(np.isnan(EQ)) >= 1:
print("Error!!!")
return EQ
def objetivo(p, S, I, y0, t, w):
return ErroQuadratico(S,I,y0,t,p,w)
Minimização da função custo¶
[10]:
# Criação das ponderações dos erros
wS = max(Id) / max(Sd)
w = [wS, 1]
(c,kvg) = optimize.leastsq(objetivo, theta0, args=(Sd, Id, y0, t, w))
print(c)
[2.10765916e-08 6.83427510e-02]
Visualização¶
[11]:
[Sa,Ia] = SIRsim(y0,t,c)
p = figure(tools=TOOLS, y_range=(0,5000), plot_width=600, plot_height=400)
p.scatter(t, Sd, legend_label="Suscetíveis - dados", size=8, fill_color="#ffd885", fill_alpha=0.7, line_alpha=0)
p.scatter(t, Id, legend_label="Infectados - dados", size=8, fill_color="#de425b", fill_alpha=0.7, line_alpha=0)
p.line(t, Sa, legend_label="Suscetíveis", line_width=4, color="#f9bd3d", line_cap='round', line_alpha=0.9)
p.line(t, Ia, legend_label="Infectados", line_width=4, color="#f4193c", line_cap='round', line_alpha=0.9)
p.grid.grid_line_alpha = 0
p.ygrid.band_fill_color = "olive"
p.ygrid.band_fill_alpha = 0.1
p.yaxis.axis_label = "Indivíduos"
p.xaxis.axis_label = "Dias"
show(p)
Reamostrando os dados¶
[12]:
Sd_res, t_res = scs.resample(Sd, 365, t=t)
Id_res, t_res = scs.resample(Id, 365, t=t)
p = figure(tools=TOOLS, y_range=(0,4000), plot_width=600, plot_height=400)
p.scatter(t, Id, legend_label="Infectados", size=10, fill_color="#f4193c", fill_alpha=0.6, line_alpha=0)
p.scatter(t_res, Id_res, legend_label="Infectados - Resample", size=4, fill_color="#8e44ad", line_alpha=0)
p.grid.grid_line_alpha = 0
p.ygrid.band_fill_color = "olive"
p.ygrid.band_fill_alpha = 0.1
p.yaxis.axis_label = "Indivíduos"
p.xaxis.axis_label = "Dias"
show(p)
Restimando os parâmetros¶
[13]:
(c, kvg) = optimize.leastsq(objetivo, theta0, args=(Sd_res, Id_res, y0, t_res, w))
print(c)
[Sa,Ia] = SIRsim(y0, t_res, c)
p.line(t_res, Ia, legend_label="Infectados - Modelo", line_width=4, color="#f4193c", line_cap='round', line_alpha=0.9)
p.grid.grid_line_alpha = 0
p.ygrid.band_fill_color = "olive"
p.ygrid.band_fill_alpha = 0.1
p.yaxis.axis_label = "Indivíduos"
p.xaxis.axis_label = "Dias"
show(p)
[2.15762357e-07 6.89608748e-01]
Outros métodos de estimação¶
[14]:
import warnings
warnings.filterwarnings("error")
from scipy.optimize import dual_annealing, shgo, basinhopping, differential_evolution
def cost_function(theta0,Sd,Id,y0,t,w):
""" function to pass to optimize.leastsq
The routine will square and sum the values returned by
this function"""
try:
[S, I] = SIRsim(y0, t, theta0)
erroS = (w[0]*(S - Sd)**2)
erroI = (w[1]*(I - Id)**2)
final_error = sum(erroI)
except:
final_error = 10**14
return final_error
beta_approx = 1 / data["pop"].max()
r_approx = 1 / 365
x0 = [beta_approx, r_approx]
lower, upper = [x0[0]/10, x0[1]/10], [10*x0[0], 1000*x0[1]]
bounds = zip(lower, upper)
#res = dual_annealing(cost_function, x0=x0, maxiter=2000, bounds=list(bounds), args=(Sd_res, Id_res, y0, t_res, w))
#res = shgo(cost_function, bounds=list(bounds), args=(Sd_res, Id_res, y0, t_res, w))
#res = basinhopping(cost_function, x0=x0, niter=200, minimizer_kwargs=dict(args=(Sd_res, Id_res, y0, t_res, w)))
res = differential_evolution(cost_function, list(bounds), args=(Sd_res, Id_res, y0, t_res, w))
res
[14]:
fun: 43830917.444261
jac: array([2.89188102e+17, 3.06168899e+07])
message: 'Optimization terminated successfully.'
nfev: 906
nit: 26
success: True
x: array([2.16790744e-07, 6.92576449e-01])
[15]:
[Sa,Ia] = SIRsim(y0, t_res, res.x)
p = figure(tools=TOOLS, y_range=(0,4000), plot_width=600, plot_height=400)
p.scatter(t, Id, legend_label="Infectados", size=10, fill_color="#f4193c", fill_alpha=0.6, line_alpha=0)
p.scatter(t_res, Id_res, legend_label="Infectados - Resample", size=4, fill_color="#8e44ad", line_alpha=0)
p.line(t_res, Ia, legend_label="Infectados - Modelo", line_width=4, color="#f4193c", line_cap='round', line_alpha=0.9)
p.grid.grid_line_alpha = 0
p.ygrid.band_fill_color = "olive"
p.ygrid.band_fill_alpha = 0.1
p.yaxis.axis_label = "Indivíduos"
p.xaxis.axis_label = "Dias"
show(p)
Análise anual¶
[16]:
years = data["time"].astype(int).unique().tolist()
p = figure(tools=TOOLS+",hover",
x_range=(min(years), max(years)),
plot_width=600, plot_height=300)
p.line(data["time"], data["cases"],
legend_label="Casos", line_width=4, color="#f4511e", line_cap='round', line_alpha=0.9)
p.grid.grid_line_alpha = 0
p.ygrid.band_fill_color = "olive"
p.ygrid.band_fill_alpha = 0.1
p.yaxis.axis_label = "Indivíduos"
p.xaxis.axis_label = "Dias"
p.toolbar.autohide = True
show(p)
Detecção dos períodos de contágio¶
[17]:
from PyAstronomy import pyasl
# Filtrando os dados para o processo de derivação
filt_cases = pyasl.smooth(data["cases"], 13, 'hamming')
# Incluindo os dados filtrados no plot
p.line(data["time"], filt_cases, line_dash="dotted",
legend_label="Casos Filtrado", line_width=3, color="#8e44ad", line_cap='round', line_alpha=0.9)
show(p)
[18]:
cases_variation = np.diff(filt_cases) # Calculando a taxa de variação
threshold = np.std(cases_variation) # Calculando o threshold baseado no desvio
# padrão da taxa de variação
threshold_vec = [threshold for k in cases_variation]
p = figure(tools=TOOLS+",hover",
plot_width=600, plot_height=300)
p.line(data["time"][:-1], cases_variation,
legend_label="Derivada dos casos", line_width=4, color="#f4511e", line_cap='round', line_alpha=0.9)
p.line(data["time"][:-1], threshold_vec,
legend_label="Threshold", line_width=4, color="#8e44ad", line_cap='round', line_alpha=0.9)
p.grid.grid_line_alpha = 0
p.ygrid.band_fill_color = "olive"
p.ygrid.band_fill_alpha = 0.1
p.yaxis.axis_label = "Indivíduos"
p.xaxis.axis_label = "Dias"
p.toolbar.autohide = True
show(p)
[19]:
def findEpidemyBreaks(cases, threshold_prop, cases_before=10):
"""
"""
# Filtering the data
filt_cases = pyasl.smooth(cases, 11, 'hamming')
# Compute the derivative and standard deviation
cases_variation = np.diff(filt_cases).tolist()
threshold = threshold_prop * np.std(cases_variation)
start_points, end_points = [], []
in_epidemy, out_epidemy = False, False
for k, value in enumerate(cases_variation):
if not in_epidemy:
# Check value
if value > threshold:
in_epidemy = True
# Find the start point
start_index = 0 if k-cases_before < 0 else k-cases_before
window = [abs(v) for v in cases_variation[start_index:k]]
ref_index = window.index(min(window))
start_points.append(k - (cases_before - ref_index))
else:
check_1 = (cases_variation[k-1] < 0)
check_2 = (value >= 0)
if check_1 and check_2:
in_epidemy = False
end_points.append(k)
return start_points, end_points
[20]:
start, end = findEpidemyBreaks(data["cases"], 1, 12)
p = figure(tools=TOOLS+",hover",
x_range=(min(years), max(years)),
y_range=(0, 8000),
plot_width=600, plot_height=400)
p.line(data["time"], data["cases"],
legend_label="Casos", line_width=4, color="#f4511e", line_cap='round', line_alpha=0.9)
windowed_data = { "I":[], "S":[], "B":[], "t":[] }
for s, e in zip(start, end):
windowed_data["B"].append(data["births"][s:e].to_numpy())
windowed_data["S"].append(data["pop"][s:e].to_numpy())
windowed_data["I"].append(data["cases"][s:e].to_numpy())
windowed_data["t"].append(data["time"][s:e].to_numpy())
p.line(windowed_data["t"][-1], windowed_data["I"][-1],
legend_label="Casos na Epidemia", line_width=3, color="#8e44ad", line_cap='round', line_alpha=0.9)
p.line([data["time"][s], data["time"][s]], [0, 10000], line_dash="dotted",
legend_label="Início da Epidemia", line_width=1, color="#455a64", line_cap='round', line_alpha=0.9)
p.grid.grid_line_alpha = 0
p.ygrid.band_fill_color = "olive"
p.ygrid.band_fill_alpha = 0.1
p.yaxis.axis_label = "Indivíduos"
p.xaxis.axis_label = "Ano"
p.toolbar.autohide = True
index = 5
days = (windowed_data["t"][index] - windowed_data["t"][index][0]) * 365
p1 = figure(tools=TOOLS+",hover",
plot_width=600, plot_height=400)
p1.line(days, windowed_data["I"][index],
legend_label="Casos", line_width=4, color="#8e44ad", line_cap='round', line_alpha=0.9)
p1.grid.grid_line_alpha = 0
p1.ygrid.band_fill_color = "olive"
p1.ygrid.band_fill_alpha = 0.1
p1.yaxis.axis_label = "Indivíduos"
p1.xaxis.axis_label = "Dias"
p1.toolbar.autohide = True
show(column(p,p1))
[21]:
final_data = {
"data": {
"original": [],
"resampled": [],
"simulated": []
},
"pars": {
"beta": [],
"r": []
}
}
zipped_data = zip(
windowed_data["S"],
windowed_data["I"],
windowed_data["B"],
windowed_data["t"])
for S, I, B, t in zipped_data:
year_ref = t[0]
S = S - I + B # Calculando os suscetiveis
t = (t - year_ref) * 365 # Tempo em dias
# Condições iniciais
y0 = S[0], I[0]
theta0 = [1e-8, 1e-1]
# Criando a ponderação
# dos erros
wS = max(I) / max(S)
w = [wS, 1]
# Reamostrando os dados
Sd_res, t_res = scs.resample(S, int(t[-1]), t=t)
Id_res, t_res = scs.resample(I, int(t[-1]), t=t)
# Estimando os parâmetros
bounds = zip([x0[0]/10, x0[1]/10], [10*x0[0], 1000*x0[1]])
res = differential_evolution(cost_function, list(bounds), args=(Sd_res, Id_res, y0, t_res, w))
# Simulando os dados
[Sa, Ia] = SIRsim(y0, t_res, res.x)
# Save the year data
final_data["data"]["original"].append(
{ "I": I, "B": B, "S": S, "t": t/365 + year_ref })
final_data["data"]["resampled"].append(
{"I": Id_res, "S": Sd_res, "t": t_res/365 + year_ref})
final_data["data"]["simulated"].append(
{"I": Ia, "S": Sa, "t": t_res/365 + year_ref})
final_data["pars"]["beta"].append( res.x[0] )
final_data["pars"]["r"].append( res.x[1] )
[22]:
r = final_data["pars"]["r"]
beta = final_data["pars"]["beta"]
years = [int(t[0]) for t in windowed_data["t"]]
p = figure(tools=TOOLS+",hover",
y_range=(min(beta), max(beta)),
plot_width=600, plot_height=300)
p.line(years, beta,
legend_label="beta", line_width=4, color="#c2185b", line_cap='round', line_alpha=0.9)
p.extra_y_ranges = {"r_axis": Range1d(start=min(r), end=max(r))}
p.add_layout(LinearAxis(y_range_name="r_axis"), 'left')
p.line(years, r, y_range_name="r_axis", line_dash='dashed',
legend_label="r", line_width=3, color="#8e44ad", line_cap='round', line_alpha=0.9)
p.grid.grid_line_alpha = 0
p.ygrid.band_fill_color = "olive"
p.ygrid.band_fill_alpha = 0.1
p.xaxis.axis_label = "Ano"
p.toolbar.autohide = True
p1 = figure(tools=TOOLS+",hover",
x_range=p.x_range,
plot_width=600, plot_height=300)
p1.line(data["time"], data["cases"],
legend_label="Casos", line_width=2, color="#f4511e", line_cap='round', line_alpha=0.9)
for dataset in final_data["data"]["original"]:
p1.line(dataset["t"], dataset["I"],
legend_label="Casos", line_width=4, color="#f4511e", line_cap='round', line_alpha=0.9)
for dataset in final_data["data"]["simulated"]:
p1.line(dataset["t"], dataset["I"], line_dash='dashed',
legend_label="Estimado", line_width=3, color="#0288d1", line_cap='round', line_alpha=0.9)
p1.grid.grid_line_alpha = 0
p1.ygrid.band_fill_color = "olive"
p1.ygrid.band_fill_alpha = 0.1
p1.yaxis.axis_label = "Indivíduos"
p1.xaxis.axis_label = "Ano"
p1.toolbar.autohide = True
show(column(p,p1))
[23]:
source = ColumnDataSource(data=dict(date=data["time"].to_numpy(), value=data["cases"].to_numpy()))
p = figure(plot_height=300, plot_width=600, tools="xpan", toolbar_location=None,
x_axis_location="above", x_range=(data["time"][5], data["time"][105]))
p.line('date', 'value', source=source,
legend_label="Casos", line_width=2, color="#f4511e", line_cap='round', line_alpha=0.9)
for dataset in final_data["data"]["resampled"]:
rsource = ColumnDataSource(data=dict(date=dataset["t"], value=dataset["I"]))
p.scatter('date', 'value', source=rsource,
legend_label="Casos reamostrados", fill_color="#8e44ad", size=4, line_alpha=0)
p.yaxis.axis_label = 'Indivíduos'
p.grid.grid_line_alpha = 0
p.xgrid.band_fill_color = "navy"
p.xgrid.band_fill_alpha = 0.1
p.toolbar.autohide = True
select = figure(title="Drag the middle and edges of the selection box to change the range above",
plot_height=130, plot_width=600, y_range=p.y_range,
y_axis_type=None, tools="", toolbar_location=None)
range_tool = RangeTool(x_range=p.x_range)
range_tool.overlay.fill_color = "navy"
range_tool.overlay.fill_alpha = 0.2
select.line('date', 'value', source=source,
line_width=2, color="#f4511e", line_cap='round', line_alpha=0.9)
select.ygrid.grid_line_color = None
select.add_tools(range_tool)
select.toolbar.active_multi = range_tool
select.xaxis.axis_label = 'Ano'
show(column(p, select))
[24]:
p1 = figure(plot_height=300, plot_width=600, tools="xpan", toolbar_location=None,
x_axis_location="above", x_range=p.x_range)
p1.line('date', 'value', source=source,
legend_label="Casos", line_width=2, color="#f4511e", line_cap='round', line_alpha=0.9)
for dataset in final_data["data"]["simulated"]:
psource = ColumnDataSource(data=dict(date=dataset["t"], value=dataset["I"]))
p1.line('date', 'value', source=psource, line_dash="dashed",
legend_label="Modelo estimado", color="#0288d1",
line_width=4, line_cap='round', line_alpha=0.9)
window_bound_lower = [dataset["t"][0], dataset["t"][0]]
window_bound_upper = [dataset["t"][-1], dataset["t"][-1]]
window_limits = [0, 10000]
p.line(window_bound_lower, window_limits, line_dash="dashed",
legend_label="Janela da epidemia", color="#455a64",
line_width=2, line_cap='round', line_alpha=0.2)
p.line(window_bound_upper, window_limits, line_dash="dashed",
legend_label="Janela da epidemia", color="#455a64",
line_width=2, line_cap='round', line_alpha=0.2)
p1.line(window_bound_lower, window_limits, line_dash="dashed",
legend_label="Janela da epidemia", color="#455a64",
line_width=2, line_cap='round', line_alpha=0.2)
p1.line(window_bound_upper, window_limits, line_dash="dashed",
legend_label="Janela da epidemia", color="#455a64",
line_width=2, line_cap='round', line_alpha=0.2)
p1.xaxis.axis_line_alpha = 0
p1.xaxis.major_label_text_color = None
p1.xaxis.major_tick_line_color = None
p1.xaxis.minor_tick_line_color = None
p1.yaxis.axis_label = 'Indivíduos'
p1.grid.grid_line_alpha = 0
p1.xgrid.band_fill_color = "navy"
p1.xgrid.band_fill_alpha = 0.1
p1.toolbar.autohide = True
show(column(p, p1, select))
Salvando as variáveis¶
[25]:
import pickle
save_variable = {
"data" : data,
"final" : final_data
}
with open('UK_estimated_data.pickle', 'wb') as handle:
pickle.dump(save_variable, handle)